package ins.framework.kafkastream.interactive;

import ins.framework.kafkastream.CommonUtil;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import java.util.Arrays;
import java.util.Map;

public class DemoInit {
    final static Serde<String> stringSerde = Serdes.String();
    public static void main(String[] args) {

    }
    public static void queryLocalStore(){
        Map<String, Object> configer = CommonUtil.connection("queryLocal_group","10.10.1.7");
        final StreamsBuilder builder = new StreamsBuilder();
        StreamsConfig streamsConfig = new StreamsConfig(configer);
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
        // Get the key-value store CountsKeyValueStore
        ReadOnlyKeyValueStore<String, Long> keyValueStore =
                streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());

// Get value by key
        System.out.println("count for hello:" + keyValueStore.get("hello"));

// Get the values for a range of keys available in this application instance
//        KeyValueIterator<String, Long> range = keyValueStore.range("all", "streams");
//        while (range.hasNext()) {
//            KeyValue<String, Long> next = range.next();
//            System.out.println("count for " + next.key + ": " + next.value);
//        }
// close the iterator to release resources
//        range.close();

// Get the values for all of the keys available in this application instance
        KeyValueIterator<String, Long> range = keyValueStore.all();
        while (range.hasNext()) {
            KeyValue<String, Long> next = range.next();
            System.out.println("count for " + next.key + ": " + next.value);
        }
// close the iterator to release resources
        range.close();
    }
    public static void createStore(){
        Map<String, Object> configer = CommonUtil.connection("queryLocal_group","10.10.1.7");
        final StreamsBuilder builder = new StreamsBuilder();
        final KStream<String, String> textLines = builder.stream("streamtopic3p");
        // Define the processing topology (here: WordCount)
        KGroupedStream<String, String> groupedByWord = textLines
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, word) -> word, Serialized.with(stringSerde, stringSerde));

// Create a key-value store named "CountsKeyValueStore" for the all-time word counts
        groupedByWord.count(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("CountsKeyValueStore"));

// Start an instance of the topology
        StreamsConfig streamsConfig = new StreamsConfig(configer);
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
        streams.start();
    }
}
